package com.buydeem.share.rabbit.confirm;

import com.buydeem.share.rabbit.Utils;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/**
 * 消费者手动ack
 *
 * @author zengchao
 * @date 2021-07-08 14:50:50
 */
@Slf4j
public class ConsumerConfirmDemo {

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = Utils.getFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        DeliverCallback deliverCallback = new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {
                try {
                    byte[] body = message.getBody();
                    String messageContent = new String(body, StandardCharsets.UTF_8);
                    if("error".equals(messageContent)){
                        throw new RuntimeException("业务异常");
                    }
                    log.info("收到的消息内容：{}",messageContent);
                    channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
                }catch (Exception e){
                    log.info("消费消息失败!重回队列!");
                    channel.basicNack(message.getEnvelope().getDeliveryTag(),false,true);
                }
            }
        };

        CancelCallback cancelCallback = new CancelCallback() {
            @Override
            public void handle(String consumerTag) throws IOException {
                log.info("取消订阅：{}",consumerTag);
            }
        };

        channel.basicConsume("confirm.queue",false,deliverCallback,cancelCallback);
    }
}
